1   /*
2    * Copyright (C) 2011 The Guava Authors
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5    * in compliance with the License. You may obtain a copy of the License at
6    *
7    * http://www.apache.org/licenses/LICENSE-2.0
8    *
9    * Unless required by applicable law or agreed to in writing, software distributed under the License
10   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11   * or implied. See the License for the specific language governing permissions and limitations under
12   * the License.
13   */
14  
15  package com.google.common.collect;
16  
17  import com.google.common.annotations.Beta;
18  import com.google.common.base.Preconditions;
19  
20  import java.util.ArrayDeque;
21  import java.util.Collection;
22  import java.util.Deque;
23  import java.util.PriorityQueue;
24  import java.util.Queue;
25  import java.util.concurrent.ArrayBlockingQueue;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.LinkedBlockingDeque;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.PriorityBlockingQueue;
31  import java.util.concurrent.SynchronousQueue;
32  import java.util.concurrent.TimeUnit;
33  
34  /**
35   * Static utility methods pertaining to {@link Queue} and {@link Deque} instances.
36   * Also see this class's counterparts {@link Lists}, {@link Sets}, and {@link Maps}.
37   *
38   * @author Kurt Alfred Kluever
39   * @since 11.0
40   */
41  public final class Queues {
42    private Queues() {}
43  
44    // ArrayBlockingQueue
45  
46    /**
47     * Creates an empty {@code ArrayBlockingQueue} with the given (fixed) capacity
48     * and nonfair access policy.
49     */
50    public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
51      return new ArrayBlockingQueue<E>(capacity);
52    }
53  
54    // ArrayDeque
55  
56    /**
57     * Creates an empty {@code ArrayDeque}.
58     *
59     * @since 12.0
60     */
61    public static <E> ArrayDeque<E> newArrayDeque() {
62      return new ArrayDeque<E>();
63    }
64  
65    /**
66     * Creates an {@code ArrayDeque} containing the elements of the specified iterable,
67     * in the order they are returned by the iterable's iterator.
68     *
69     * @since 12.0
70     */
71    public static <E> ArrayDeque<E> newArrayDeque(Iterable<? extends E> elements) {
72      if (elements instanceof Collection) {
73        return new ArrayDeque<E>(Collections2.cast(elements));
74      }
75      ArrayDeque<E> deque = new ArrayDeque<E>();
76      Iterables.addAll(deque, elements);
77      return deque;
78    }
79  
80    // ConcurrentLinkedQueue
81  
82    /**
83     * Creates an empty {@code ConcurrentLinkedQueue}.
84     */
85    public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
86      return new ConcurrentLinkedQueue<E>();
87    }
88  
89    /**
90     * Creates a {@code ConcurrentLinkedQueue} containing the elements of the specified iterable,
91     * in the order they are returned by the iterable's iterator.
92     */
93    public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
94        Iterable<? extends E> elements) {
95      if (elements instanceof Collection) {
96        return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
97      }
98      ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
99      Iterables.addAll(queue, elements);
100     return queue;
101   }
102 
103   // LinkedBlockingDeque
104 
105   /**
106    * Creates an empty {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE}.
107    *
108    * @since 12.0
109    */
110   public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque() {
111     return new LinkedBlockingDeque<E>();
112   }
113 
114   /**
115    * Creates an empty {@code LinkedBlockingDeque} with the given (fixed) capacity.
116    *
117    * @throws IllegalArgumentException if {@code capacity} is less than 1
118    * @since 12.0
119    */
120   public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(int capacity) {
121     return new LinkedBlockingDeque<E>(capacity);
122   }
123 
124   /**
125    * Creates a {@code LinkedBlockingDeque} with a capacity of {@link Integer#MAX_VALUE},
126    * containing the elements of the specified iterable,
127    * in the order they are returned by the iterable's iterator.
128    *
129    * @since 12.0
130    */
131   public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(Iterable<? extends E> elements) {
132     if (elements instanceof Collection) {
133       return new LinkedBlockingDeque<E>(Collections2.cast(elements));
134     }
135     LinkedBlockingDeque<E> deque = new LinkedBlockingDeque<E>();
136     Iterables.addAll(deque, elements);
137     return deque;
138   }
139 
140   // LinkedBlockingQueue
141 
142   /**
143    * Creates an empty {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE}.
144    */
145   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
146     return new LinkedBlockingQueue<E>();
147   }
148 
149   /**
150    * Creates an empty {@code LinkedBlockingQueue} with the given (fixed) capacity.
151    *
152    * @throws IllegalArgumentException if {@code capacity} is less than 1
153    */
154   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
155     return new LinkedBlockingQueue<E>(capacity);
156   }
157 
158   /**
159    * Creates a {@code LinkedBlockingQueue} with a capacity of {@link Integer#MAX_VALUE},
160    * containing the elements of the specified iterable,
161    * in the order they are returned by the iterable's iterator.
162    *
163    * @param elements the elements that the queue should contain, in order
164    * @return a new {@code LinkedBlockingQueue} containing those elements
165    */
166   public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
167     if (elements instanceof Collection) {
168       return new LinkedBlockingQueue<E>(Collections2.cast(elements));
169     }
170     LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
171     Iterables.addAll(queue, elements);
172     return queue;
173   }
174 
175   // LinkedList: see {@link com.google.common.collect.Lists}
176 
177   // PriorityBlockingQueue
178 
179   /**
180    * Creates an empty {@code PriorityBlockingQueue} with the ordering given by its
181    * elements' natural ordering.
182    *
183    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
184    */
185   public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
186     return new PriorityBlockingQueue<E>();
187   }
188 
189   /**
190    * Creates a {@code PriorityBlockingQueue} containing the given elements.
191    *
192    * <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
193    * this priority queue will be ordered according to the same ordering.
194    *
195    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
196    */
197   public static <E extends Comparable> PriorityBlockingQueue<E> newPriorityBlockingQueue(
198       Iterable<? extends E> elements) {
199     if (elements instanceof Collection) {
200       return new PriorityBlockingQueue<E>(Collections2.cast(elements));
201     }
202     PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
203     Iterables.addAll(queue, elements);
204     return queue;
205   }
206 
207   // PriorityQueue
208 
209   /**
210    * Creates an empty {@code PriorityQueue} with the ordering given by its
211    * elements' natural ordering.
212    *
213    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
214    */
215   public static <E extends Comparable> PriorityQueue<E> newPriorityQueue() {
216     return new PriorityQueue<E>();
217   }
218 
219   /**
220    * Creates a {@code PriorityQueue} containing the given elements.
221    *
222    * <b>Note:</b> If the specified iterable is a {@code SortedSet} or a {@code PriorityQueue},
223    * this priority queue will be ordered according to the same ordering.
224    *
225    * @since 11.0 (requires that {@code E} be {@code Comparable} since 15.0).
226    */
227   public static <E extends Comparable> PriorityQueue<E> newPriorityQueue(
228       Iterable<? extends E> elements) {
229     if (elements instanceof Collection) {
230       return new PriorityQueue<E>(Collections2.cast(elements));
231     }
232     PriorityQueue<E> queue = new PriorityQueue<E>();
233     Iterables.addAll(queue, elements);
234     return queue;
235   }
236 
237   // SynchronousQueue
238 
239   /**
240    * Creates an empty {@code SynchronousQueue} with nonfair access policy.
241    */
242   public static <E> SynchronousQueue<E> newSynchronousQueue() {
243     return new SynchronousQueue<E>();
244   }
245 
246   /**
247    * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
248    * {@code numElements} elements are not available, it will wait for them up to the specified
249    * timeout.
250    *
251    * @param q the blocking queue to be drained
252    * @param buffer where to add the transferred elements
253    * @param numElements the number of elements to be waited for
254    * @param timeout how long to wait before giving up, in units of {@code unit}
255    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
256    * @return the number of elements transferred
257    * @throws InterruptedException if interrupted while waiting
258    */
259   @Beta
260   public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
261       long timeout, TimeUnit unit) throws InterruptedException {
262     Preconditions.checkNotNull(buffer);
263     /*
264      * This code performs one System.nanoTime() more than necessary, and in return, the time to
265      * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
266      * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
267      */
268     long deadline = System.nanoTime() + unit.toNanos(timeout);
269     int added = 0;
270     while (added < numElements) {
271       // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
272       // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
273       added += q.drainTo(buffer, numElements - added);
274       if (added < numElements) { // not enough elements immediately available; will have to poll
275         E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
276         if (e == null) {
277           break; // we already waited enough, and there are no more elements in sight
278         }
279         buffer.add(e);
280         added++;
281       }
282     }
283     return added;
284   }
285   
286   /**
287    * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)}, 
288    * but with a different behavior in case it is interrupted while waiting. In that case, the 
289    * operation will continue as usual, and in the end the thread's interruption status will be set 
290    * (no {@code InterruptedException} is thrown). 
291    * 
292    * @param q the blocking queue to be drained
293    * @param buffer where to add the transferred elements
294    * @param numElements the number of elements to be waited for
295    * @param timeout how long to wait before giving up, in units of {@code unit}
296    * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
297    * @return the number of elements transferred
298    */
299   @Beta
300   public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer, 
301       int numElements, long timeout, TimeUnit unit) {
302     Preconditions.checkNotNull(buffer);
303     long deadline = System.nanoTime() + unit.toNanos(timeout);
304     int added = 0;
305     boolean interrupted = false;
306     try {
307       while (added < numElements) {
308         // we could rely solely on #poll, but #drainTo might be more efficient when there are 
309         // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
310         added += q.drainTo(buffer, numElements - added);
311         if (added < numElements) { // not enough elements immediately available; will have to poll
312           E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
313           while (true) {
314             try {
315               e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
316               break;
317             } catch (InterruptedException ex) {
318               interrupted = true; // note interruption and retry
319             }
320           }
321           if (e == null) {
322             break; // we already waited enough, and there are no more elements in sight
323           }
324           buffer.add(e);
325           added++;
326         }
327       }
328     } finally {
329       if (interrupted) {
330         Thread.currentThread().interrupt();
331       }
332     }
333     return added;
334   }
335 
336   /**
337    * Returns a synchronized (thread-safe) queue backed by the specified queue. In order to
338    * guarantee serial access, it is critical that <b>all</b> access to the backing queue is
339    * accomplished through the returned queue.
340    *
341    * <p>It is imperative that the user manually synchronize on the returned queue when accessing
342    * the queue's iterator: <pre>   {@code
343    *
344    *   Queue<E> queue = Queues.synchronizedQueue(MinMaxPriorityQueue.<E>create());
345    *   ...
346    *   queue.add(element);  // Needn't be in synchronized block
347    *   ...
348    *   synchronized (queue) {  // Must synchronize on queue!
349    *     Iterator<E> i = queue.iterator(); // Must be in synchronized block
350    *     while (i.hasNext()) {
351    *       foo(i.next());
352    *     }
353    *   }}</pre>
354    *
355    * <p>Failure to follow this advice may result in non-deterministic behavior.
356    *
357    * <p>The returned queue will be serializable if the specified queue is serializable.
358    *
359    * @param queue the queue to be wrapped in a synchronized view
360    * @return a synchronized view of the specified queue
361    * @since 14.0
362    */
363   public static <E> Queue<E> synchronizedQueue(Queue<E> queue) {
364     return Synchronized.queue(queue, null);
365   }
366 
367   /**
368    * Returns a synchronized (thread-safe) deque backed by the specified deque. In order to
369    * guarantee serial access, it is critical that <b>all</b> access to the backing deque is
370    * accomplished through the returned deque.
371    *
372    * <p>It is imperative that the user manually synchronize on the returned deque when accessing
373    * any of the deque's iterators: <pre>   {@code
374    *
375    *   Deque<E> deque = Queues.synchronizedDeque(Queues.<E>newArrayDeque());
376    *   ...
377    *   deque.add(element);  // Needn't be in synchronized block
378    *   ...
379    *   synchronized (deque) {  // Must synchronize on deque!
380    *     Iterator<E> i = deque.iterator(); // Must be in synchronized block
381    *     while (i.hasNext()) {
382    *       foo(i.next());
383    *     }
384    *   }}</pre>
385    *
386    * <p>Failure to follow this advice may result in non-deterministic behavior.
387    *
388    * <p>The returned deque will be serializable if the specified deque is serializable.
389    *
390    * @param deque the deque to be wrapped in a synchronized view
391    * @return a synchronized view of the specified deque
392    * @since 15.0
393    */
394   public static <E> Deque<E> synchronizedDeque(Deque<E> deque) {
395     return Synchronized.deque(deque, null);
396   }
397 }